6. Hadoop HA

Zookeeper 集群搭建

ZK 的写流程,客户端可以连接任意的 zkserver 实例,向 server 发送写请求命令,如果当前连接的 server 不是Leader,server 会将写命令发送给 Leader,Leader 将写操作命令广播到集群的其他节点,所有节点都执行写操作命令,一旦集群中半数以上的节点写数据成功,Leader 会响应当前 Server,让当前 Server 响应客户端,写操作完成。

集群搭建流程可参考之前 文章), 启动后效果如下:

Hadoop HA

之前搭建 hadoop 集群的时候,NN 和 RM 都只有一个节点,那么实现 hadoop 的 HA,必须保证在 NN 和 RM 故障时,采取容错机制,可以让集群继续使用。

HDFS HA

元数据同步过程

NN 的高可用中元数据的同步过程为,在 active【使用 active 状态来标记主节点,使用 standby 状态标记备用节点】的 NN 格式化后,将空白的 fsimage 文件拷贝到所有的 NN 的机器上,active 的 NN 在启动后,将 edits 文件中的内容发送给 Journalnode 进程,standby 状态的 NN 主动从 Journalnode 进程拷贝数据,保证元数据的同步。Journalnode 在设计时,采用 paxos 协议, Journalnode 适合在奇数台机器上启动,在 hadoop 中,要求至少需要3个 Journalnode 进程,如果开启了 hdfs 的 ha, 就不能再启动 2NN。在同一时刻,最多只能有一个 NN 作为主节点,对外提供服务,其余的 NN,都作为备用节点,不对外提供服务。

搭建过程

1. 修改 core-site.xml 中的 fs.defaultFS 地址

1
2
3
4
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>

mycluster 是自定义的集群名称

2. 修改 hdfs-site.xml 文件,配置 N 个 NN 运行的主机和端口。配置 JournalNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<!-- 完全分布式集群名称 -->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>

<!-- 集群中NameNode节点都有哪些 -->
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>

<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>hadoop10:9000</value>
</property>

<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>hadoop11:9000</value>
</property>

<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>hadoop10:50070</value>
</property>

<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>hadoop11:50070</value>
</property>

<!-- 指定NameNode元数据在JournalNode上的存放位置(JournalNode 至少三个) -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop10:8485;hadoop11:8485;hadoop12:8485/mycluster</value>
</property>

<!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应。当发生自动故障转移的时候,使用 ssh 发送命令的方式来杀死之前的服务,防止脑裂的情况 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>

<!-- 使用隔离机制时需要ssh无秘钥登录 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/rexyan/.ssh/id_rsa</value>
</property>

<!-- 声明journalnode服务器存储目录-->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/module/hadoop-2.7.2/data/jn</value>
</property>

<!-- 关闭权限检查-->
<property>
<name>dfs.permissions.enable</name>
<value>false</value>
</property>

<!-- 访问代理类:client,mycluster,active配置失败自动切换实现方式-->
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

上面配置中配置了两个NN 节点,分别在 hadoop10 和 hadoop11 机器上,分别配置了两个节点的 RPC 地址和 HTTP 地址,配置了 Journal 服务所在的位置等。

启动过程

1. 在所有机器上启动 JournalNode

1
sh xcall hadoop-daemons.sh start journalnode

查看状态

1
sh xcall jps

2. 格式化 NN,将格式化后的 fsimage 文件同步到其他 NN 节点,启动所有 NN,将其中一个 NN 的状态转换为 active 状态

上面配置了两个 NN,格式化 hadoop10 上的 NN 并格式化

1
2
hadoop namenode -format  # 格式化 hadoop 10 上的 NN
hadoop-daemon.sh start namenode # 启动 hadoop 10 上的 NN

在 hadoop11 上同步 hadoop 10 上的数据,包括 fsimage 文件等

1
2
hdfs namenode -bootstrapStandby   # 在 hadoop11 上同步 hadoop 10 上的数据
hadoop-daemon.sh start namenode # 启动 hadoop11 上的 namenode

启动 nn1 和 nn2 的 datanode

1
sh xcall hadoop-daemons.sh start datanode

访问 http://hadoop11:50070/http://hadoop10:50070/ 两个 nn 的 namenode web 地址,两者都显示为 standby

手动将某个 nn 节点修改为 active 状态

1
hdfs haadmin -transitionToActive nn1  # 手动将 nn1 节点状态改为 active

再次访问 http://hadoop10:50070/ 就能看到状态从 standby 变成了 active。

3. 文件上传测试

上传一个文件到 hdfs 中,因为只有 nn1 是 active 的,所以只有 nn1 提供服务。在 web 页面中只有 nn1 可以看到上传的文件信息。如果手动将 nn1 状态修改为 standby,将 nn2 状态修改为 active,那么就只有 nn2 可以看到上传的文件信息而 nn1 则不可以。

1
2
hdfs haadmin -transitionToStandby nn1  # 将 nn1 切换为 Standby
hdfs haadmin -transitionToActive nn2 # 将 nn2 切换为 Active
手动故障转移

在上面的过程中 nn2 已经成为了 active 的状态,现在手动杀死 nn2 的 namenode 进程。因为 nn2 是 actice,且已经被杀死了,所以现在是无法正常提供服务的。

1
2
jps  # 先获取 namenode 进程号
kill -9 3596 # 结束 namenode 进程

强制将 nn1 的状态修改为 Active

1
hdfs haadmin -transitionToActive --forceactive nn1  # 强制将 nn1 修改为 Active

修改成功后 hdfs 就能正常的提供服务了

自动故障转移

自动故障转移为 HDFS 部署增加了两个新组件:ZooKeeper 和 ZKFailoverController(ZKFC)进程。ZKFC 使用一个健康检查命令定期地 ping 与之在相同主机的 NameNode,只要该 NameNode 及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。并且其他 NameNode 的 ZKFC 进程在 ZooKeeper 抢夺分布式锁,抢到的则成为 Active 状态。

为了防止脑裂情况的发生,hadoop HDFS 提供了两种解决方法,一种是配置 ssh 发送 kill 命令,即其他机器会通过 ssh 的方式来给你发送 kill 命令,防止你是假死。另外一种是自己配置一个脚本,当自己的 ZKFC 进程检测到自己处于不健康的状态时,那么就调用最的脚本将自己杀死。

自动故障转移配置

在 hdfs-site.xml 中配置自动故障转移

1
2
3
4
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

在 core-site.xml 中配置 zk 的集群地址信息

1
2
3
4
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop10:2181,hadoop11:2181,hadoop12:2181</value>
</property>

分发修改的两个文件,然后启动 zk 服务

1
2
sh xcall /opt/module/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start   # 启动
sh xcall /opt/module/apache-zookeeper-3.6.1-bin/bin/zkServer.sh status # 查看状态

初始化 HA 在 Zookeeper 中状态(其实就是在 zk 中新增一个 znode 信息,下面存放 hadoop ha 的信息)

1
hdfs zkfc -formatZK

启动 hdfs 服务

1
start-dfs.sh

查看 http://hadoop11:50070/http://hadoop10:50070/ 发现 hadoop11 成为了active

模拟故障,将 hadoop11 namenode 进程杀死,然后发现 hadoop10 自动成为了 active 状态。

YARN HA

在 yarn-site.xml 中增加下面配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    <!--启用resourcemanager ha-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!--声明两台resourcemanager的地址-->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster-yarn1</value>
</property>

<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop10</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop11</value>
</property>

<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadoop10:2181,hadoop11:2181,hadoop12:2181</value>
</property>

<!--启用自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

上面配置中声明了两台 resourcemanager 的地址分别是 hadoop10 和 hadoop11。还配置了 zookeeper 集群的地址,配置了自动恢复等。

在 hadoop10 和 hadoop11 上启动 rm

1
2
yarn-daemon.sh start resourcemanager  # hadoop10 上执行
yarn-daemon.sh start resourcemanager # hadoop11 上执行

访问 http://hadoop10:8088http://hadoop11:8088 会发现 http://hadoop11:8088 会跳转到 http://hadoop10:8088。所以 hadoop10 上的 Yarn 就变成了 active 对外服务。

将 hadoop10 上的 Yarn 进程杀死会发现只有访问 http://hadoop11:8088 才能成功,因为 http://hadoop11:8088 成为了 active。若再次将 hadoop10 上的 yarn 重新启动后,访问 hadoop10 会跳转到 hadoop11.

压缩

目的和原则

压缩的目的:压缩的目的是在 MR 运行期间,提高 MR 运行的效率,压缩可以减少 MR 运行期间的磁盘IO 和网络IO。

压缩的原则:IO 密集型,多用压缩。计算密集型,CPU 负载过重,少用压缩。

Hadoop 默认支持的压缩格式有 deflate, bzip2, gzip。需要额外安装的有 lzo, snappy。特点是 bzip2 压缩比最高,压缩速度最慢。snappy 压缩速度最快,压缩比凑合。deflate,gzip 折中。

常用配置

压缩常用配置项如下:

1
2
3
4
5
6
io.compression.codecs: 代表整个Job运行期间,可以使用哪些压缩格式,配置这个参数后,配置的压缩格式会被自动初始化,默认值 deflate,gzip,bzip2
mapreduce.map.output.compress: map阶段输出的key-value是否采用压缩,默认值 false
mapreduce.map.output.compress.codec: map阶段输出的key-value采用何种压缩,默认值 deflate
mapreduce.output.fileoutputformat.compress: job在reduce阶段最终的输出是否采用压缩, 默认值 false
mapreduce.output.fileoutputformat.compress.codec: job在reduce阶段最终的输出采用何种压缩,默认值deflate
mapreduce.output.fileoutputformat.compress.type: 如果Job输出的文件以SequenceFile格式,SequenceFile 中的数据,要以何种形式进行压缩。NONE:是否压缩及如何压缩取决于操作系统,RECORD(默认):每个key-value对作为一个单位,压缩一次。BLOCK:SequenceFile中的block,SequenceFile中的block默认为64K,每个block压缩一次!

压缩场景

什么时候需要考虑压缩:

  1. Mapper 的输入: 主要考虑每个文件的大小,如果文件过大,需要使用可以切片的压缩格式。
  2. Reducer 的输出: reducer 的输出主要考虑,输出之后,是否需要下一个 Job 继续处理,如果需要被下个 Job 继续处理,且单个文件过大,也要使用可以切片的压缩格式。
  3. shuffle阶段:能加速即可

调度器

FIFO调度器

FIFO 调度器的特点就是单队列,所有的 Job 按照客户端提交的先后顺序,先到先服务。弊端是如果当前队列中有一个大的 Job,非常消耗资源,那么这个 Job 之后的其他 Job 都需要付额外的等待时间。造成集群的资源利用率不足。

容量调度器

容量调度器的本质是多个 FIFO 的队列组成,Hadoop 默认使用就是容量调度器。

特点是每个队列可以配置一定的容量,空闲的资源可以匀给其他队列临时使用。可以配置每个job使用的容量的限制,防止一个大的 job 独占所有资源。可以配置每个用户可以使用的容量限制,防止当个用户占用所有资源。

公平调度器

公平调度器的设置和容量调度器大致相同,也是多条队列,每天队列都可以设置一定的容量,每个 Job,用户可以设置容量。区别在于公平调度器在调度策略上,采用最大最小公平算法,来调度 Job,这个算法会保证同一个队列中,所有已经提交,未运行结束的 Job,获取到队列中的资源是平等的。

Hadoop的优化

小文件的优化

源头上处理,在上传到集群之前,提前处理小文件
小文件已经在 HDFS 存在,可以使用 hadoop archieve 进行归档
在运行 MR 时,可以使用 CombineTextInputFormat 将多个小文件规划到一个切片中
小文件过多,可以开启 JVM 重用

MR 的优化

合理设置 MapTask 和 ReduceTask 的数量

避免数据倾斜。如果 Map 端的数据发生倾斜,那么在切片时,注意每片数据尽量均匀,防止有些不可切片的数据。Reduce 端的数据倾斜,提前对数据进行抽样调查,统计出大致的分布范围,根据分布范围,合理编写Partitioner,让每个分区的数据尽量均衡。

优化磁盘 IO 和网络 IO。可以启用 combiner。启动压缩。调大 MapTask 缓冲区的大小,减少溢写次数。调大MapTask 中 merge 阶段一次合并的片段数,减少合并花费的时间。调大 reduceTask 中 shuffle 线程可以使用的内存,减少溢写次数。调大 reduceTask 中,input.buffer 的大小,提前缓存部分数据到 buffer 中。